0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import math


def reward_function(params):
next_point = params['closest_waypoints'][1]
is_left_of_center = params['is_left_of_center']
speed = params['speed']

short_path_waypoint = list(range(48, 60))
is_short_path = (next_point in short_path_waypoint and not is_left_of_center) or (
next_point not in short_path_waypoint and is_left_of_center)
is_turning = next_point in list(range(22, 42))

if is_turning:
return math.sin(speed + 0.1) * 1.5 if is_short_path else math.sin(speed + 0.1)
return math.sin(speed - 1.25) * 1.25 if is_short_path else math.sin(speed - 1.25)

搭建交易执行层核心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import base64
import datetime
import hashlib
import hmac
import json
import time

import requests

base_url = "https://api.sandbox.gemini.com"
endpoint = "/v1/order/new"
url = base_url + endpoint

gemini_api_key = "account-zmidXEwP72yLSSybXVvn"
gemini_api_secret = "375b97HfE7E4tL8YaP3SJ239Pky9".encode()

t = datetime.datetime.now()
payload_nonce = str(int(time.mktime(t.timetuple()) * 1000))

payload = {
"request": "/v1/order/new",
"nonce": payload_nonce,
"symbol": "btcusd",
"amount": "5",
"price": "3633.00",
"side": "buy",
"type": "exchange limit",
"options": ["maker-or-cancel"]
}

encoded_payload = json.dumps(payload).encode()
b64 = base64.b64encode(encoded_payload)
signature = hmac.new(gemini_api_secret, b64, hashlib.sha384).hexdigest()

request_headers = {
'Content-Type': "text/plain",
'Content-Length': "0",
'X-GEMINI-APIKEY': gemini_api_key,
'X-GEMINI-PAYLOAD': b64,
'X-GEMINI-SIGNATURE': signature,
'Cache-Control': "no-cache"
}

response = requests.post(url,
data=None,
headers=request_headers)

new_order = response.json()
print(new_order)

行情数据对接和抓取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import copy
import json
import ssl
import time

import websocket


class OrderBook(object):
BIDS = 'bid'
ASKS = 'ask'

def __init__(self, limit=20):
self.limit = limit

# (price, amount)
self.bids = {}
self.asks = {}

self.bids_sorted = []
self.asks_sorted = []

def insert(self, price, amount, direction):
if direction == self.BIDS:
if amount == 0:
if price in self.bids:
del self.bids[price]
else:
self.bids[price] = amount
elif direction == self.ASKS:
if amount == 0:
if price in self.asks:
del self.asks[price]
else:
self.asks[price] = amount
else:
print('WARNING: unknown direction {}'.format(direction))

def sort_and_truncate(self):
# sort
self.bids_sorted = sorted([(price, amount) for price, amount in self.bids.items()], reverse=True)
self.asks_sorted = sorted([(price, amount) for price, amount in self.asks.items()])

# truncate
self.bids_sorted = self.bids_sorted[:self.limit]
self.asks_sorted = self.asks_sorted[:self.limit]

# copy back to bids and asks
self.bids = dict(self.bids_sorted)
self.asks = dict(self.asks_sorted)

def get_copy_of_bids_and_asks(self):
return copy.deepcopy(self.bids_sorted), copy.deepcopy(self.asks_sorted)


class Crawler:
def __init__(self, symbol, output_file):
self.order_book = OrderBook(limit=10)
self.output_file = output_file

self.ws = websocket.WebSocketApp('wss://api.gemini.com/v1/marketdata/{}'.format(symbol),
on_message=lambda ws, message: self.on_message(message))
self.ws.run_forever(sslopt={'cert_reqs': ssl.CERT_NONE})

def on_message(self, message):
data = json.loads(message)
for event in data['events']:
price, amount, direction = float(event['price']), float(event['remaining']), event['side']
self.order_book.insert(price, amount, direction)

self.order_book.sort_and_truncate()

with open(self.output_file, 'a+') as f:
bids, asks = self.order_book.get_copy_of_bids_and_asks()
output = {
'bids': bids,
'asks': asks,
'ts': int(time.time() * 1000)
}
f.write(json.dumps(output) + '\n')


if __name__ == '__main__':
crawler = Crawler(symbol='BTCUSD', output_file='BTCUSD.txt')

策略与回测系统

Utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os.path as path

import pandas as pd


def assert_msg(condition, msg):
if not condition:
raise Exception(msg)


def read_file(filename):
# 获得文件绝对路径
filepath = path.join(path.dirname(__file__), filename)
assert_msg(path.exists(filepath), "文件不存在")
return pd.read_csv(filepath,
index_col=0,
parse_dates=True,
infer_datetime_format=True)


def SMA(values, n):
"""
Simple Moving Average 简单移动均值
"""
return pd.Series(values).rolling(n).mean()


def crossover(series_1, series_2) -> bool:
"""
这俩指标,一个是小窗口的 SMA,一个是大窗口的 SMA;
如果小窗口的 SMA 从下面刺破或者穿过大窗口 SMA,那么说明,投资品的价格在短期内快速上涨,同时这个趋势很强烈,可能是一个买入的信号;
反之,如果大窗口的 SMA 从下方突破小窗口 SMA,那么说明,投资品的价格在短期内快速下跌,我们应该考虑卖出。
"""
return series_1[-2] < series_2[-2] and series_1[-1] > series_2[-1]

Strategy.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import abc
from typing import Callable

import numpy as np


class Strategy(metaclass=abc.ABCMeta):
def __init__(self, broker, data):
self._broker = broker
self._data = data
self._tick = 0

def I(self, func: Callable, *args) -> np.ndarray:
"""
计算买卖指标向量,用于判定这个时间点上需要进行"买"还是"卖"
"""
value = func(*args)
value = np.asarray(value)
assert_msg(value.shape[-1] == len(self._data.Close), "指标长度必须和 data 长度相同")
return value

@property
def tick(self):
return self._tick

@abc.abstractmethod
def init(self):
pass

@abc.abstractmethod
def next(self, tick):
"""
步进函数,执行第 tick 步的策略;tick 代表当前的"时间"
"""
pass

def buy(self):
self._broker.buy()

def sell(self):
self._broker.sell()

@property
def data(self):
return self._data


from Utils import assert_msg, crossover, SMA


class SmaCross(Strategy):
# 小窗口 SMA 的窗口大小,用于计算 SMA 快线
fast = 30

# 大窗口 SMA 的窗口大小,用于计算 SMA 慢线
slow = 90

def init(self):
# 计算历史上每个时刻的快线和慢线
self.sma1 = self.I(SMA, self.data.Close, self.fast)
self.sma2 = self.I(SMA, self.data.Close, self.slow)

def next(self, tick):
# 如果快线刚好越过慢线,买入全部
if crossover(self.sma1[:tick], self.sma2[:tick]):
self.buy()
# 如果慢线刚好越过快线,卖出全部
elif crossover(self.sma2[:tick], self.sma1[:tick]):
self.sell()
# 否则,不执行任何操作
else:
pass

Backtest.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from numbers import Number

import numpy as np
import pandas as pd

from Strategy import Strategy, SmaCross
from Utils import read_file, assert_msg


class ExchangeAPI:
def __init__(self, data, cash, commission):
assert_msg(0 < cash, "初始现金数量必须大于 0,输入的现金数量:{}".format(cash))
assert_msg(0 <= commission <= 0.05, "合理的手续费率一般不会超过 5%,输入的费率:{}".format(commission))
self._inital_cash = cash
self._data = data
self._commission = commission
self._position = 0
self._cash = cash
self._i = 0

@property
def cash(self):
"""
:return: 返回当前账户现金数量
"""
return self._cash

@property
def position(self):
"""
:return: 返回当前账户仓位
"""
return self._position

@property
def initial_cash(self):
return self._inital_cash

@property
def market_value(self):
return self._cash + self._position * self.current_price

@property
def current_price(self):
return self._data.Close[self._i]

def buy(self):
self._position = float(self._cash * (1 - self._commission) / self.current_price)
self._cash = 0.0

def sell(self):
self._cash += float(self._position * self.current_price * (1 - self._commission))
self._position = 0.0

def next(self, tick):
self._i = tick


class Backtest:
def __init__(self,
data: pd.DataFrame,
strategy_type: type(Strategy),
broker_type: type(ExchangeAPI),
cash: float = 10000,
commission: float = .0):
assert_msg(issubclass(strategy_type, Strategy), "strategy_type 不是 Strategy 类型")
assert_msg(issubclass(broker_type, ExchangeAPI), "broker_type 不是 ExchangeAPI 类型")
assert_msg(isinstance(commission, Number), "commission 不是浮点数值类型")

data = data.copy(False)

if 'Volume' not in data:
data['Volume'] = np.nan

# 检查缺失值
assert_msg(not data[['Open', 'High', 'Low', 'Close']].max().isnull().any(),
("部分 OHLC 包含缺失值,请去掉那些行或者通过差值填充"))

# 如果行情数据没有按照时间排序,进行重新排序
if not data.index.is_monotonic_increasing:
data = data.sort_index()

self._data = data
self._broker = broker_type(data, cash, commission)
self._strategy = strategy_type(self._broker, self._data)
self._results = None

def run(self) -> pd.Series:
strategy = self._strategy
broker = self._broker

strategy.init()

# 设定回测开始位置和结束位置
start = 100
end = len(self._data)

# 更新市场状态,然后再执行策略
for i in range(start, end):
# 注意要先把市场状态移动到第 i 个时刻
broker.next(i)
strategy.next(i)

self._results = self._compute_result(broker)
return self._results

def _compute_result(self, broker):
s = pd.Series()
s['初始市值'] = broker.initial_cash
s['结束市值'] = broker.market_value
s['收益'] = broker.market_value - broker.initial_cash
return s


def main():
BTCUSD = read_file('BTCUSD_GEMINI.csv')
ret = Backtest(BTCUSD, SmaCross, ExchangeAPI, 10000.0, 0.003).run()
print(ret)


if __name__ == '__main__':
main()

单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import requests

def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
for site in sites:
download_one(site)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

# Download 15 sites in 8.991675334 seconds

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import concurrent.futures
import time
import requests

def download_one(url):
resp = requests.get(url)
print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# futures = []
# for site in sites:
# future = executor.submit(download_one, site)
# futures.append(future)
# for future in concurrent.futures.as_completed(futures):
# future.result()
executor.map(download_one, sites)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
download_all(sites)
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

# Download 15 sites in 3.591666209 seconds

多进程

1
2
3
4
5
def download_all(sites):
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(download_one, sites)

# Download 15 sites in 2.238637416 seconds

Asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import time
import aiohttp

async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print('Read {} from {}'.format(resp.content_length, url))

async def download_all(sites):
tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*tasks)

def main():
sites = [
'https://en.wikipedia.org/wiki/Portal:Arts',
'https://en.wikipedia.org/wiki/Portal:History',
'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
asyncio.run(download_all(sites))
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
main()

# Download 15 sites in 1.5994991669999998 seconds

如何选择

1
2
3
4
5
6
7
if io_bound:
if io_slow:
print('Use Asyncio')
else:
print('Use multi-threading')
elif cpu_bound:
print('Use multi-processing')

交叉验证

把获取的全部训练数据分成两份:一份用于测试,一份用于训练。然后用前者来评估模型。

回归问题的验证

\[ \frac{1}{n}\sum_{i=1}^{n}{(y^{(i)}-f_\theta(x^{(i)}))^2} \]

分类问题的验证

精度

\[ Accuracy = \frac{TP + TN}{TP + FP + FN + TN} \]

精确率

\[ Precision = \frac{TP}{TP + FP} \]

召回率

\[ Recall = \frac{TP}{TP + FN} \]

F 值

\[ Fmeasure = \frac{2}{\frac{1}{Precision} + \frac{1}{Recall}} \]

正则化

\[ \theta_0 := \theta_0 - \eta(\sum_{i=1}^{n}{(f_\theta(x^{(i)})-y^{(i)})x_j^{(i)}}) \]

\[ \theta_j := \theta_j - \eta(\sum_{i=1}^{n}{(f_\theta(x^{(i)})-y^{(i)})x_j^{(i)}}+\lambda\theta_j) \]

回归的正则化

\[ C(\theta) = \frac{1}{2}\sum_{i=1}^{n}{(y^{(i)}-f_\theta(x^{(i)}))^2} \]

\[ R(\theta) = \frac{\lambda}{2}\sum_{j=1}^{m}{\theta_j^2} \]

\[ E(\theta) = C(\theta) + R(\theta) \]

分类的正则化

\[ C(\theta) = -\sum_{i=1}^{n}{(y^{(i)}\log f_\theta(x^{(i)})+(1-y^{(i)})\log (1-f_\theta(x^{(i)})))} \]

\[ R(\theta) = \frac{\lambda}{2}\sum_{j=1}^{m}{\theta_j^2} \]

\[ E(\theta) = C(\theta) + R(\theta) \]

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np
import matplotlib.pyplot as plt

# 真正的函数
def g(x):
return 0.1 * (x ** 3 + x ** 2 + x)

# 随意准备一些向真正的函数加入了一点噪声的训练数据
train_x = np.linspace(-2, 2, 8)
train_y = g(train_x) + np.random.randn(train_x.size) * 0.05

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
return (x - mu) / sigma
train_z = standardize(train_x)

# 创建训练数据的矩阵
def to_matrix(x):
return np.vstack([
np.ones(x.size),
x,
x ** 2,
x ** 3,
x ** 4,
x ** 5,
x ** 6,
x ** 7,
x ** 8,
x ** 9,
x ** 10
]).T
X = to_matrix(train_z)

# 预测函数
def f(x):
return np.dot(x, theta)

# 目标函数
def E(x, y):
return 0.5 * np.sum((y - f(x)) ** 2)

# 参数初始化
theta = np.random.randn(X.shape[1])
# 正则化常量
LAMBDA = 0.5
# 学习率
ETA = 1e-4
# 误差
diff = 1
# 重复学习(不应用正则化)
error = E(X, train_y)
while diff > 1e-6:
theta = theta - ETA * (np.dot(f(X) - train_y, X))
current_error = E(X, train_y)
diff = error - current_error
error = current_error
theta1 = theta

# 重复学习(应用正则化)
theta = np.random.randn(X.shape[1])
diff = 1
error = E(X, train_y)
while diff > 1e-6:
reg_term = LAMBDA * np.hstack([0, theta[1:]])
theta = theta - ETA * (np.dot(f(X) - train_y, X) + reg_term)
current_error = E(X, train_y)
diff = error - current_error
error = current_error
theta2 = theta

# 绘图确认
plt.plot(train_z, train_y, 'o')
z = standardize(np.linspace(-2, 2, 100))
# 未应用正则化
theta = theta1
plt.plot(z, f(to_matrix(z)), linestyle='dashed')
# 应用正则化
theta = theta2
plt.plot(z, f(to_matrix(z)))
plt.show()

预测函数 - sigmoid 函数

\[ f_\theta(x) = \frac{1}{1 + exp(-\theta^Tx)} \]

决策边界

\[ y = \begin{cases} 1 & (\theta^Tx \geq 0)\\ 0 & (\theta^Tx < 0) \end{cases} \]

目标函数 - 似然函数

\[ L(\theta) = \prod_{i=1}^{n}{P(y^{(i)}=1|x^{(i)})^{y^{(i)}}P(y^{(i)}=0|x^{(i)})^{1-y^{(i)}}} \]

对数似然函数

\[ \log L(\theta) = \sum_{i=1}^{n}{(y^{(i)}\log f_\theta(x^{(i)})+(1-y^{(i)})\log (1-f_\theta(x^{(i)})))} \]

参数更新表达式

\[ \theta_j := \theta_j - \eta\sum_{i=1}^{n}{(f_\theta(x^{(i)}) - y^{(i)})x_j^{(i)}} \]

线性可分问题

\[ \theta^Tx = \theta_0x_0 + \theta_1x_1 + \theta_2x_2 = \theta_0 + \theta_1x_1 + \theta_2x_2 = 0 \]

\[ x_2 = -\frac{\theta_0 + \theta_1x_1}{\theta_2} \]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import numpy as np
import matplotlib.pyplot as plt

# 训练数据
train = np.loadtxt('images2.csv', delimiter=',', skiprows=1)
train_x = train[:, 0:2]
train_y = train[:, 2]

# 参数初始化
theta = np.random.rand(3)

# 标准化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
return (x - mu) / sigma
train_z = standardize(train_x)

# 增加 x0
def to_matrix(x):
x0 = np.ones([x.shape[0], 1])
return np.hstack([x0, x])
X = to_matrix(train_z)

# sigmoid 函数
def f(x):
return 1 / (1 + np.exp(-np.dot(x, theta)))

# 学习率
ETA = 1e-3
# 重复次数
epoch = 5000
# 更新次数
count = 0
# 重复学习
for _ in range(epoch):
theta = theta - ETA * np.dot(f(X) - train_y, X)
# 日志输出
count += 1
print('第 {} 次 : theta = {}'.format(count, theta))

# 绘图确认
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
x1 = np.linspace(-2, 2, 100)
plt.plot(x1, -(theta[0] + theta[1] * x1) / theta[2], linestyle='dashed')
plt.show()

线性不可分问题

\[ \theta^Tx = \theta_0x_0 + \theta_1x_1 + \theta_2x_2 + \theta_3x_1^2 = \theta_0 + \theta_1x_1 + \theta_2x_2 + \theta_3x_1^2 = 0 \]

\[ x_2 = -\frac{\theta_0 + \theta_1x_1 + \theta_3x_1^2}{\theta_2} \]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import numpy as np
import matplotlib.pyplot as plt

# 训练数据
train = np.loadtxt('data3.csv', delimiter=',', skiprows=1)
train_x = train[:, 0:2]
train_y = train[:, 2]

# 参数初始化
theta = np.random.rand(4)

# 标准化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
return (x - mu) / sigma
train_z = standardize(train_x)

# 增加 x0 和 x3
def to_matrix(x):
x0 = np.ones([x.shape[0], 1])
x3 = x[:, 0, np.newaxis] ** 2
return np.hstack([x0, x, x3])
X = to_matrix(train_z)

# sigmoid 函数
def f(x):
return 1 / (1 + np.exp(-np.dot(x, theta)))

# 学习率
ETA = 1e-3
# 重复次数
epoch = 5000
# 更新次数
count = 0
# 重复学习
for _ in range(epoch):
theta = theta - ETA * np.dot(f(X) - train_y, X)
# 日志输出
count += 1
print('第 {} 次 : theta = {}'.format(count, theta))

# 绘图确认
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
x1 = np.linspace(-2, 2, 100)
x2 = -(theta[0] + theta[1] * x1 + theta[3] * x1 ** 2) / theta[2]
plt.plot(x1, x2, linestyle='dashed')
plt.show()

随机梯度下降法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import numpy as np
import matplotlib.pyplot as plt

# 训练数据
train = np.loadtxt('data3.csv', delimiter=',', skiprows=1)
train_x = train[:, 0:2]
train_y = train[:, 2]

# 参数初始化
theta = np.random.rand(4)

# 标准化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
return (x - mu) / sigma
train_z = standardize(train_x)

# 增加 x0 和 x3
def to_matrix(x):
x0 = np.ones([x.shape[0], 1])
x3 = x[:, 0, np.newaxis] ** 2
return np.hstack([x0, x, x3])
X = to_matrix(train_z)

# sigmoid 函数
def f(x):
return 1 / (1 + np.exp(-np.dot(x, theta)))

# 学习率
ETA = 1e-3
# 重复次数
epoch = 5000
# 更新次数
count = 0
# 重复学习
for _ in range(epoch):
# 使用随机梯度下降法更新参数
p = np.random.permutation(X.shape[0])
for x, y in zip(X[p, :], train_y[p]):
theta = theta - ETA * (f(x) - y) * x
# 日志输出
count += 1
print('第 {} 次 : theta = {}'.format(count, theta))

# 绘图确认
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
x1 = np.linspace(-2, 2, 100)
x2 = -(theta[0] + theta[1] * x1 + theta[3] * x1 ** 2) / theta[2]
plt.plot(x1, x2, linestyle='dashed')
plt.show()